package org.example.common;


import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author zhangjg
 * @desc Consumer
 * @date 2021/12/22 22:06
 **/
public class ConsumerP {

    private final static Logger log = LoggerFactory.getLogger(ConsumerP.class);

    public static void main(String[] args) throws Exception{
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        MessageListener myMessageListener = (consumer, msg) -> {
            try {
                log.info("Message received: {} " , new String(msg.getData()));
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        };

        Consumer consumer = client.newConsumer()
                .topic("persistent://public/default/tt11")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionType(SubscriptionType.Shared)
                .messageListener(myMessageListener)
                .subscribe();


    }
}
